<?php
namespace PHPDaemon\Clients\MySQL;

use PHPDaemon\Core\Daemon;
use PHPDaemon\Network\ClientConnection;
use PHPDaemon\Structures\StackCallbacks;
use PHPDaemon\Utils\Binary;

class Connection extends ClientConnection
{

    /**
     * @var integer Sequence. Pointer of packet sequence
     */
    public $seq = 0;

    /**
     * @var integer Client flags
     */
    public $clientFlags = 239237;

    /**
     * @var integer
     */
    public $threadId;

    /**
     * @var string
     */
    public $scramble;

    /**
     * @var string
     */
    public $serverver;

    /**
     * @var integer
     */
    public $serverCaps;

    /**
     * @var integer
     */
    public $serverLang;

    /**
     * @var integer Server flags: http://dev.mysql.com/doc/internals/en/status-flags.html
     */
    public $serverStatus;

    /**
     * @var integer Number of warnings generated by the command
     */
    public $warnCount;

    /**
     * @var string
     */
    public $message;

    /**
     * @var integer Charset number (see MySQL charset list)
     */
    public $charsetNumber = 0x21;

    /**
     * @var string User name
     */
    protected $user = 'root';

    /**
     * @var string Password
     */
    protected $password = '';

    /**
     * @var string Database name
     */
    public $dbname = '';

    /**
     * @TODO DESCR
     */
    const STATE_STANDBY = 0;

    /**
     * @TODO DESCR
     */
    const STATE_BODY = 1;

    /**
     * @var string Phase
     */
    protected $phase = 0;

    /**
     * @TODO DESCR
     */
    const PHASE_GOT_INIT = 1;

    /**
     * @TODO DESCR
     */
    const PHASE_AUTH_SENT = 2;

    /**
     * @TODO DESCR
     */
    const PHASE_AUTH_ERR = 3;

    /**
     * @TODO DESCR
     */
    const PHASE_HANDSHAKED = 4;

    /**
     * @var integer State of pointer of incoming data. 0 - Result Set Header Packet, 1 - Field Packet, 2 - Row Packet
     */
    protected $rsState = 0;

    /**
     * @TODO DESCR
     */
    const RS_STATE_HEADER = 0;

    /**
     * @TODO DESCR
     */
    const RS_STATE_FIELD = 1;

    /**
     * @TODO DESCR
     */
    const RS_STATE_ROW = 2;

    /**
     * @var integer Packet size
     */
    protected $pctSize = 0;

    /**
     * @var array Result rows
     */
    public $resultRows = [];

    /**
     * @var array Result fields
     */
    public $resultFields = [];

    /**
     * @var object Property holds a reference to user's object
     */
    public $context;

    /**
     * @var integer INSERT_ID()
     */
    public $insertId;

    /**
     * @var integer Affected rows
     */
    public $affectedRows;

    /**
     * @var integer Protocol version
     */
    public $protover = 0;

    /**
     * @var integer Timeout
     */
    public $timeout = 120;

    /**
     * @var integer Error number
     */
    public $errno = 0;

    /**
     * @var string Error message
     */
    public $errmsg = '';

    /**
     * @var integer Low mark
     */
    protected $lowMark = 4;

    /**
     * Executes the given callback when/if the connection is handshaked
     * @param  callable $cb Callback
     * @callback $cb ( Connection $conn, boolean $success )
     * @return void
     */
    public function onConnected($cb)
    {
        if ($this->phase === self::PHASE_AUTH_ERR) {
            $cb($this, false);
        } elseif ($this->phase === self::PHASE_HANDSHAKED) {
            $cb($this, true);
        } else {
            if (!$this->onConnected) {
                $this->onConnected = new StackCallbacks();
            }
            $this->onConnected->push($cb);
        }
    }

    /**
     * Called when the connection is handshaked (at low-level), and peer is ready to recv. data
     * @return void
     */
    public function onReady()
    {
        if (mb_orig_strlen($this->path) && !mb_orig_strlen($this->dbname)) {
            $this->dbname = $this->path;
        }
    }

    /**
     * Sends a packet
     * @param  string $packet Data
     * @return boolean         Success
     */
    public function sendPacket($packet)
    {
        //Daemon::log('Client --> Server: ' . Debug::exportBytes($packet) . "\n\n");
        return $this->write(Binary::int2bytes(3, mb_orig_strlen($packet), true) . chr($this->seq++) . $packet);
    }

    /**
     * Builds length-encoded binary string
     * @param  string $s String
     * @return string    Resulting binary string
     */
    public function buildLenEncodedBinary($s)
    {
        if ($s === null) {
            return "\251";
        }

        $l = mb_orig_strlen($s);

        if ($l <= 250) {
            return chr($l) . $s;
        }

        if ($l <= 0xFFFF) {
            return "\252" . Binary::int2bytes(2, true) . $s;
        }

        if ($l <= 0xFFFFFF) {
            return "\254" . Binary::int2bytes(3, true) . $s;
        }

        return Binary::int2bytes(8, $l, true) . $s;
    }

    /**
     * Parses length-encoded binary integer
     * @return integer Result
     */
    public function parseEncodedBinary()
    {
        $f = ord($this->read(1));
        if ($f <= 250) {
            return $f;
        }
        if ($f === 251) {
            return null;
        }
        if ($f === 255) {
            return false;
        }
        if ($f === 252) {
            return Binary::bytes2int($this->read(2), true);
        }
        if ($f === 253) {
            return Binary::bytes2int($this->read(3), true);
        }
        return Binary::bytes2int($this->read(8), true);
    }

    /**
     * Parse length-encoded string
     * @return integer Result
     */
    public function parseEncodedString()
    {
        $l = $this->parseEncodedBinary();
        if (($l === null) || ($l === false)) {
            return $l;
        }
        return $this->read($l);
    }

    /**
     * Generates auth. token
     * @param  string $scramble Scramble string
     * @param  string $password Password
     * @return string           Result
     */
    public function getAuthToken($scramble, $password)
    {
        return sha1($scramble . sha1($hash1 = sha1($password, true), true), true) ^ $hash1;
    }

    /**
     * Sends auth. packet
     * @return void
     */
    public function auth()
    {
        if ($this->phase !== self::PHASE_GOT_INIT) {
            return;
        }
        $this->phase = self::PHASE_AUTH_SENT;
        $this->onResponse->push(function ($conn, $result) {
            if ($conn->onConnected) {
                $conn->connected = true;
                $conn->onConnected->executeAll($conn, $result);
                $conn->onConnected = null;
            }
        });
        $this->clientFlags =
            Pool::CLIENT_LONG_PASSWORD |
            Pool::CLIENT_LONG_FLAG |
            Pool::CLIENT_LOCAL_FILES |
            Pool::CLIENT_PROTOCOL_41 |
            Pool::CLIENT_INTERACTIVE |
            Pool::CLIENT_TRANSACTIONS |
            Pool::CLIENT_SECURE_CONNECTION |
            Pool::CLIENT_MULTI_STATEMENTS |
            Pool::CLIENT_MULTI_RESULTS;

        $this->sendPacket(
            $packet = pack('VVc', $this->clientFlags, $this->pool->maxAllowedPacket, $this->charsetNumber)
                . "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
                . $this->user . "\x00"
                . ($this->password === '' ? "\x00" : $this->buildLenEncodedBinary(
                    $this->getAuthToken($this->scramble, $this->password)
                ))
                . ($this->path !== '' ? $this->path . "\x00" : '')
        );
    }

    /**
     * Sends SQL-query
     * @param  string $q Query
     * @param  callable $cb Optional. Callback called when response received
     * @callback $cb ( Connection $conn, boolean $success )
     * @return boolean            Success
     */
    public function query($q, $cb = null)
    {
        if ($this->finished) {
            throw new ConnectionFinished;
        }
        return $this->command(Pool::COM_QUERY, $q, $cb);
    }

    /**
     * Begins a transaction
     * @param  callable $cb Optional. Callback called when response received
     * @throws ConnectionFinished
     */
    public function begin($cb = null)
    {
        $this->query('BEGIN', $cb);
        $this->acquire();
    }

    /**
     * Commit a transaction
     * @param  callable $cb Optional. Callback called when response received
     * @throws ConnectionFinished
     */
    public function commit($cb = null)
    {
        $this->query('COMMIT', $cb);
        $this->release();
    }

    /**
     * Rollback a transaction
     * @throws ConnectionFinished
     */
    public function rollback($cb = null)
    {
        $this->query('ROLLBACK', $cb);
        $this->release();
    }

    /**
     * Sends echo-request
     * @param  callable $cb Optional. Callback called when response received
     * @callback $cb ( Connection $conn, boolean $success )
     * @return boolean            Success
     */
    public function ping($cb = null)
    {
        return $this->command(Pool::COM_PING, '', $cb);
    }

    /**
     * Sends arbitrary command
     * @param  string $cmd Command
     * @param  string $q Data
     * @param  callable $cb Optional
     * @throws ConnectionFinished
     * @callback $cb ( Connection $conn, boolean $success )
     * @return boolean            Success
     */
    public function command($cmd, $q = '', $cb = null)
    {
        if ($this->phase !== self::PHASE_HANDSHAKED) {
            return false;
        }

        $this->onResponse->push($cb);
        $this->seq = 0;
        $this->sendPacket(chr($cmd) . $q);

        return true;
    }

    /**
     * Sets default database name
     * @param  string $name Database name
     * @return boolean         Success
     */
    public function selectDB($name)
    {
        $this->dbname = $name;

        if ($this->phase !== self::PHASE_GOT_INIT) {
            return $this->query('USE `' . $name . '`');
        }

        return true;
    }

    /**
     * Called when new data received
     * @return void
     */
    public function onRead()
    {
        packet:
        if ($this->state === self::STATE_STANDBY) {
            if ($this->bev->input->length < 4) {
                return;
            }
            $this->pctSize = Binary::bytes2int($this->read(3), true);
            $this->setWatermark($this->pctSize, $this->pctSize);
            $this->state = self::STATE_BODY;
            $this->seq = ord($this->read(1)) + 1;
        }
        /* STATE_BODY */
        $l = $this->bev->input->length;
        if ($l < $this->pctSize) {
            return;
        }
        $this->state = self::STATE_STANDBY;
        $this->setWatermark(4);
        if ($this->phase === 0) {
            $this->phase = self::PHASE_GOT_INIT;
            $this->protover = ord($this->read(1));
            if ($this->protover === 0xFF) { // error
                $fieldCount = $this->protover;
                $this->protover = 0;
                $this->onResponse->push(function ($conn, $result) {
                    if ($conn->onConnected) {
                        $conn->connected = true;
                        $conn->onConnected->executeAll($conn, $result);
                        $conn->onConnected = null;
                    }
                });
                goto field;
            }
            if (($p = $this->search("\x00")) === false) {
                $this->log('nul-terminator of \'serverver\' is not found');
                $this->finish();
                return;
            }
            $this->serverver = $this->read($p);
            $this->drain(1); // drain nul-byte
            $this->threadId = Binary::bytes2int($this->read(4), true);
            $this->scramble = $this->read(8);
            $this->drain(1); // ????

            $this->serverCaps = Binary::bytes2int($this->read(2), true);
            $this->serverLang = ord($this->read(1));
            $this->serverStatus = Binary::bytes2int($this->read(2), true);
            $this->drain(13);
            $restScramble = $this->read(12);
            $this->scramble .= $restScramble;
            $this->drain(1);

            $this->auth();
        } else {
            $fieldCount = ord($this->read(1));
            field:
            if ($fieldCount === 0xFF) {
                // Error packet
                $u = unpack('v', $this->read(2));
                $this->errno = $u[1];
                $state = $this->read(6);
                $this->errmsg = $this->read($this->pctSize - $l + $this->bev->input->length);
                $this->onError();
                $this->errno = 0;
                $this->errmsg = '';
            } elseif ($fieldCount === 0x00) {
                // OK Packet Empty
                if ($this->phase === self::PHASE_AUTH_SENT) {
                    $this->phase = self::PHASE_HANDSHAKED;

                    if ($this->dbname !== '') {
                        $this->query('USE `' . $this->dbname . '`');
                    }
                }

                $this->affectedRows = $this->parseEncodedBinary();

                $this->insertId = $this->parseEncodedBinary();

                $u = unpack('v', $this->read(2));
                $this->serverStatus = $u[1];

                $u = unpack('v', $this->read(2));
                $this->warnCount = $u[1];

                $this->message = $this->read($this->pctSize - $l + $this->bev->input->length);
                $this->onResultDone();
            } elseif ($fieldCount === 0xFE) {
                // EOF Packet
                if ($this->rsState === self::RS_STATE_ROW) {
                    $this->onResultDone();
                } else {
                    ++$this->rsState;
                }
            } else {
                // Data packet
                $this->prependInput(chr($fieldCount));

                if ($this->rsState === self::RS_STATE_HEADER) {
                    // Result Set Header Packet
                    $extra = $this->parseEncodedBinary();
                    $this->rsState = self::RS_STATE_FIELD;
                } elseif ($this->rsState === self::RS_STATE_FIELD) {
                    // Field Packet
                    $field = [
                        'catalog' => $this->parseEncodedString(),
                        'db' => $this->parseEncodedString(),
                        'table' => $this->parseEncodedString(),
                        'org_table' => $this->parseEncodedString(),
                        'name' => $this->parseEncodedString(),
                        'org_name' => $this->parseEncodedString()
                    ];

                    $this->drain(1); // filler

                    $u = unpack('v', $this->read(2));

                    $field['charset'] = $u[1];
                    $u = unpack('V', $this->read(4));
                    $field['length'] = $u[1];

                    $field['type'] = ord($this->read(1));

                    $u = unpack('v', $this->read(2));
                    $field['flags'] = $u[1];

                    $field['decimals'] = ord($this->read(1));

                    $this->resultFields[] = $field;
                } elseif ($this->rsState === self::RS_STATE_ROW) {
                    // Row Packet
                    $row = [];

                    for ($i = 0, $nf = sizeof($this->resultFields); $i < $nf; ++$i) {
                        $row[$this->resultFields[$i]['name']] = $this->parseEncodedString();
                    }

                    $this->resultRows[] = $row;
                }
            }
        }
        if ($this->finished) {
            return;
        }
        $this->drain($this->pctSize - $l + $this->bev->input->length); // drain the rest of packet
        goto packet;
    }

    /**
     * Called when connection finishes
     * @return void
     */
    public function onFinish()
    {
        $this->command(Pool::COM_QUIT);
        parent::onFinish();
    }

    /**
     * Called when the whole result received
     * @return void
     */
    public function onResultDone()
    {
        $this->rsState = self::RS_STATE_HEADER;
        $this->onResponse->executeOne($this, true);
        $this->checkFree();
        $this->resultRows = [];
        $this->resultFields = [];
    }

    /**
     * Called when error occured
     * @return void
     */
    public function onError()
    {
        $this->rsState = self::RS_STATE_HEADER;
        $this->onResponse->executeOne($this, false);
        $this->checkFree();
        $this->resultRows = [];
        $this->resultFields = [];

        if (($this->phase === self::PHASE_AUTH_SENT) || ($this->phase === self::PHASE_GOT_INIT)) {
            // in case of auth error
            $this->phase = self::PHASE_AUTH_ERR;
            $this->finish();
        }

        Daemon::log(__METHOD__ . ' #' . $this->errno . ': ' . $this->errmsg);
    }
}
